/*
   Riak (http://riak.basho.com/) database client for the go programming
   language.

   Basic Usage:
   package main
   import (
       "bytes"
       "riak"
   )

   func main() {
       r := riak.NewRiak("localhost:8098")
       // Key, Value example
       value =
       d = riak.NewDocument("test", "foo",

       // JSON Marshal example
       td := new(TestDocument)
       td.Foo = 1
       td.Bar = "Test Document"
       doc := riak.NewDocument("test", docId, make([]byte, 0))
       doc.JSONObject = td
       err := r.Put(doc)
       td2 := new(TestDocument)
       doc.JSONObject = td2
       err = r.Get(doc)
       err = r.Delete(bucketName, "test_key")


    }
*/
package riak


// Good example: http://github.com/c141charlie/riak.go/blob/master/riak.go
// use http_conn := http.NewClientConn(conn, nil) because need to add
// headers and things plus things like PUT, DELETE not just GET and POST.

import (
	"net"
	"http"
	"os"
	"io"
	"io/ioutil"
	"json"
	"bytes"
	"log"
	"strconv"
	"strings"
	"regexp"
	//"fmt"
)

// Datastucture to make building url easier
type URLBuilder struct {
	URL        string
	FirstParam bool
}

// Create and return an instance of URLBuilder
func NewURLBuilder(baseUrl string) *URLBuilder {
	ub := new(URLBuilder)
	ub.URL = baseUrl
	ub.FirstParam = true

	return ub
}

// Add params to url
func (self *URLBuilder) AddParam(name, param string) {
	seperator := "&"
	if self.FirstParam {
		seperator = "?"
		self.FirstParam = false
	}

	self.URL = self.URL + seperator + name + "=" + http.URLEscape(param)
}
// From: http://github.com/c141charlie/riak.go/blob/master/riak.go
type ClosingBuffer struct {
	*bytes.Buffer
}

func (cb *ClosingBuffer) Close() (err os.Error) {
	//we don't actually have to do anything here, since the buffer is just some data in memory
	//and the error is initialized to no-error
	return
}

// Definition of a link from one document to another
type Link struct {
	Bucket string
	Key    string
	Tag    string
}

// Wrapper for making Link easier.
func NewLink(bucket, key, tag string) *Link {
	l := new(Link)
	l.Bucket = bucket
	l.Key = key
	l.Tag = tag
	return l
}

// Define a basic document to reduce API calls
type Document struct {
	Bucket      string
	Key         string
	Value       []byte
	Links       []*Link
	ContentType string
	JSONObject  interface{}
}

func NewDocument(bucket, key string, value []byte) *Document {
	d := &Document{Bucket:bucket, Key:key, Value:value, ContentType:"application/octet-stream", JSONObject: nil}
	return d
}

// Mapping riak json bucket properties
type BucketProperties struct {
	Name            string
	N_val           int
	Allow_mult      bool
	Last_write_wins bool
	Precommit       []string
	Postcommit      []string
	Chash_keyfun    map[string]string
	Linkfun         map[string]string
	Old_vclock      int
	Young_vclock    int
	Big_vclock      int
	Small_vclock    int
}

// Returned json mapping of a bucket response
type Bucket struct {
	Props BucketProperties
	Keys  []string
}

// Basic riak datastructure
type Riak struct {
	Host string
    Debug bool
}

// Create a new instance of the object
func NewRiak(host string) *Riak {
	j := new(Riak)
	j.Host = host
    j.Debug = false
	return j
}

// Centralize string building
func (self *Riak) BuildURL(bucketName, path string) string {
	url := "http://" + self.Host + "/riak/" + bucketName + "/" + path
	return url
}
// Helper to wrap creating an http.Request
func (self *Riak) BuildRequest(method, url string) *http.Request {
	var req http.Request
	req.Method = method
	req.ProtoMajor = 1
	req.ProtoMinor = 1
	req.Close = true
	req.Header = map[string]string{
		"Content-Type":    "application/json",
		"X-Riak-ClientId": "goriak",
	}
	req.TransferEncoding = []string{"chunked"}
	req.URL, _ = http.ParseURL(url)
	return &req
}

func (self *Riak) MakeRequest(req *http.Request) (*http.Response, os.Error) {

	conn, err := net.Dial("tcp", "", self.Host)
	// Make sure didn't get error on connection
	if err != nil {
		log.Print("Connection failed to: ", self.Host)
		return nil, err
	}
	clientConn := http.NewClientConn(conn, nil)

	defer clientConn.Close()

	if err := clientConn.Write(req); err != nil {
		log.Print("Could not write request on client connection")
		return nil, err
	}

	return clientConn.Read()
}

// Does all the network IO ops
// Consider using panic / recover
// http://blog.golang.org/2010/08/defer-panic-and-recover.html
func (self *Riak) ProcessRequest(req *http.Request) ([]byte, os.Error) {
	var body []byte
	resp, err := self.MakeRequest(req)

	if err == nil {
		body, readError := ioutil.ReadAll(resp.Body)
		if readError == nil {
			return body, err
		} else {
			log.Print("Unable to read server response")
		}
	} else {
		log.Print("Request get failed not happy in Request function with"+
			"error: ",
			err)
	}
	return body, err
}


// Actually gets the bucket from the server and marshals the Bucket from
// the json that is returned
func (self *Riak) GetBucket(bucketName string) (*Bucket, os.Error) {
	bp := new(Bucket)
	url := self.BuildURL(bucketName, "")
	req := self.BuildRequest("GET", url)
	data, err := self.ProcessRequest(req)
	if err == nil {
		err = json.Unmarshal(data, &bp)
		if err != nil {
			log.Print("Could not unmarshal json for bucket error: ", err)
		}
	} else {
		log.Print("Request failed: ", err)
		return nil, err
	}
	return bp, err
}

// Wrapper around get since get will create the bucket
func (self *Riak) CreateBucket(bucketName string) (*Bucket, os.Error) {
	return self.GetBucket(bucketName)
}

// Removes a bucket from the server.
func (self *Riak) RemoveBucket(bucketName string) os.Error {
	url := self.BuildURL(bucketName, "")
	req := self.BuildRequest("DELETE", url)
	_, err := self.ProcessRequest(req)
	return err
}

// Add/update an object to a bucket
func (self *Riak) PutWDWReturn(doc *Document, w, dw int, returnObject bool) os.Error {
	url := self.BuildURL(doc.Bucket, doc.Key)
	ub := NewURLBuilder(url)

	if w > 0 {
		ub.AddParam("w", strconv.Itoa(w))
	}
	if dw > 0 {
		ub.AddParam("dw", strconv.Itoa(dw))
	}

	if returnObject {
		ub.AddParam("returnbody", "true")
	} else {
		ub.AddParam("returnbody", "false")
	}
	// "Link: </raw/hb/second>; riaktag=\"foo\", </raw/hb/third>; riaktag=\"bar\"" \
	linkStrings := make([]string, len(doc.Links))
	for i, ld := range doc.Links {
		// Need to populate comma if fist one
		linkStrings[i] = "</riak/" + ld.Bucket + "/" + ld.Key + ">; riaktag=\"" + ld.Tag + "\""
	}

	method := "PUT"
	if doc.Key == "" {
		method = "POST"
	}

	req := self.BuildRequest(method, ub.URL)

	if len(doc.Links) > 0 {
		req.Header["Link"] = strings.Join(linkStrings, ",")
	}
	if doc.JSONObject != nil {
		doc.ContentType = "application/json"
		data, err := json.Marshal(doc.JSONObject)
		doc.Value = data
		if err != nil {
			log.Print("Could not marshal into json: ", err)
			return err
		}
	}
	req.Header["Content-Type"] = doc.ContentType
	cb := &ClosingBuffer{bytes.NewBuffer(doc.Value)}
	var rc io.ReadCloser
	rc = cb
	req.Body = rc

	resp, err := self.MakeRequest(req)
    println("Sent bytes", len(doc.Value))
	location, ok := resp.Header["Location"]
	if ok {
		// TODO: regexp would be much cleaner
		parts := strings.Split(location, "/", -1)
		if len(parts) > 3 {
			key := parts[3]
			if doc.Key != key {
				doc.Key = key
			}
		}
	}
	if returnObject {
		value, err := ioutil.ReadAll(resp.Body)
		doc.Value = value
        if self.Debug {
            log.Println("After put returned: " + string(value))
        }
		if err == nil && doc.JSONObject != nil {
			err = json.Unmarshal(doc.Value, &doc.JSONObject)
			if err != nil {
				log.Print("Could not unmarshal json for object error: ", err)
				return err
			}
		}
	}

	return err
}

// Put this document up with defaults set
func (self *Riak) Put(doc *Document) os.Error {
	return self.PutWDWReturn(doc, -1, -1, true)
}

// Delete with number of nodes to confirm before returning
func (self *Riak) DeleteRW(bucketName, key string, rw int) os.Error {
	url := self.BuildURL(bucketName, key)
	ub := NewURLBuilder(url)
	if rw > 0 {
		ub.AddParam("r", strconv.Itoa(rw))
	}
	//log.Stdout(ub.URL)
	req := self.BuildRequest("DELETE", ub.URL)
	_, err := self.ProcessRequest(req)
	return err

}
// Remove an object from a bucket
func (self *Riak) Delete(bucketName, key string) os.Error {
	return self.DeleteRW(bucketName, key, -1)
}

// Read document with r number of nodes agreeing before returning
func (self *Riak) GetR(doc *Document, r int) os.Error {
	url := self.BuildURL(doc.Bucket, doc.Key)
    if(self.Debug) {
        log.Println("Get request URL: " + url)
    }
	ub := NewURLBuilder(url)
	if r > 0 {
		ub.AddParam("r", strconv.Itoa(r))
	}
	//log.Stdout(ub.URL)
	req := self.BuildRequest("GET", ub.URL)

	resp, err := self.MakeRequest(req)
	links, ok := resp.Header["Link"]
	if ok {
		// This should all be REGEXP but just haven't gotten around to
		// it yet.
		var LINK_SPLITTER = regexp.MustCompile("^<.*,")
		linkSplits := LINK_SPLITTER.FindAllString(links, -1)

		docLinks := make([]*Link, len(linkSplits))
		for i, p := range linkSplits {

			l := strings.TrimSpace(p)
			subparts := strings.Split(l, ";", -1)
			path := subparts[0]
			riaktag := subparts[1]

			pathparts := strings.Split(path, "/", -1)
			riaktagparts := strings.Split(riaktag, "=", -1)
			if len(pathparts) == 4 {
				bucket := pathparts[2]
				key := pathparts[3]
				tag := riaktagparts[1]
				docLinks[i] = NewLink(bucket, key, tag)
			}
		}
		doc.Links = docLinks
	}

	if err != nil {
		return err
	}
	// Set the content type
	contentType, _ := resp.Header["Content-Type"]
	doc.ContentType = contentType

	value, err := ioutil.ReadAll(resp.Body)
	if err != nil {
		return err
	}

	doc.Value = value
	if err == nil && doc.JSONObject != nil {
		err = json.Unmarshal(doc.Value, &doc.JSONObject)
		if err != nil && self.Debug {
			log.Print("Could not unmarshal json for object error: ", err)
		}
	}
	return err
}
// Load document
func (self *Riak) Get(doc *Document) os.Error {
	return self.GetR(doc, -1)
}

// Linkwalking https://wiki.basho.com/display/RIAK/REST+API#RESTAPI-Linkwalking
// and http://blog.basho.com/2010/02/24/link-walking-by-example/


// TODO: mapreduce, link-walking, ping, server-status

// EXPERIMENTAL CONCURRENT REQUEST API

// The existing API is a sequential access to making requests. This API allows
// for async requests to the data store. This can be done either by using a
// callback or by using a request id and a channel to return results when
// they come in.

// Callback interface where the code automatically marshals the JSON
type MarshalCallback interface {
	Callback(interface{}, os.Error)
}

// Callback which just returns the JSON as a string useful
type StringCallback interface {
	Callback(string, os.Error)
}

// Channel that responses will be returned on
type RiakResponse struct {
	RequestId int
	Error     os.Error
	Document  *Document
}


// Host is the host to connect to
type AsyncRiak struct {
	Host            string
	ResponseChannel chan *RiakResponse
	RequestIndex    int
	ResponseMap     map[int]chan *RiakResponse
}

func NewAsyncRiak(host string) *AsyncRiak {
	ar := new(AsyncRiak)
	ar.Host = host
	ar.ResponseMap = make(map[int]chan *RiakResponse)
	return ar
}

func (self *AsyncRiak) NextRequestId() int {
	self.RequestIndex++
	return self.RequestIndex
}

func (self *AsyncRiak) Connect() *Riak {
	r := NewRiak(self.Host)
	return r
}

// Normal async callback when the response is ready
func (self *AsyncRiak) MarshalGet(doc *Document, mc MarshalCallback) {
	go func() {
		r := self.Connect()
		err := r.Get(doc)
		mc.Callback(doc, err)
	}()
}

// The idea it we run a BackgroundPut request in the background and we will then
// call Wait once we have to wait for the response.
func (self *AsyncRiak) BackgroundPut(doc *Document) int {
	requestId := self.NextRequestId()
	self.ResponseMap[requestId] = make(chan *RiakResponse)
	go func() {
		r := self.Connect()
		err := r.Put(doc)
		rr := new(RiakResponse)
		rr.RequestId = requestId
		rr.Error = err
		rr.Document = doc
		self.ResponseMap[requestId] <- rr
	}()
	return requestId
}


// The idea it we run a Get request in the background and we will then
// call Wait once we have to wait for the response.
func (self *AsyncRiak) BackgroundGet(doc *Document) int {
	requestId := self.NextRequestId()
	self.ResponseMap[requestId] = make(chan *RiakResponse)
	go func() {
		r := self.Connect()
		err := r.Get(doc)
		rr := new(RiakResponse)
		rr.RequestId = requestId
		rr.Error = err
		rr.Document = doc
		self.ResponseMap[requestId] <- rr
	}()
	return requestId
}

// Wait until the request returns this repsonse
func (self *AsyncRiak) Wait(requestId int) *RiakResponse {
	return <-self.ResponseMap[requestId]
}

// TODO: CAP wrappers
